/****************************************************
 * 创建人：@author fengxin    
 * 创建时间: 2019/12/19/9:26
 * 项目名称: risk
 * 文件名称: StreamExample.java
 * 文件描述: 
 *
 * All rights Reserved, Designed By 投资交易团队
 * @Copyright:2016-2019
 *
 ********************************************************/
package com.learn.zmq.stream;

import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import zmq.Msg;

import java.util.HashSet;
import java.util.Set;

/**
 *
 * SetSocketOpt中参数含义可参考{@link  <link href=https://www.cnblogs.com/fengbohello/p/4398953.html />}
 *
 * 包名称：com.ysstech.zmq.stream
 * 类名称：StreamExample
 * 类描述：SocketType.STREAM 用例
 * 创建人：@author fengxin
 * 创建时间：2019/12/19/9:26
 */

@Deprecated
public class Stream2Stream {
    public static final byte[] standardGreeting = new byte[64];

    static {
        standardGreeting[0] = (byte) 0xff;
        standardGreeting[8] = 1;
        standardGreeting[9] = 0x7f;
        standardGreeting[10] = 3;
        standardGreeting[12] = 'N';
        standardGreeting[13] = 'U';
        standardGreeting[14] = 'L';
        standardGreeting[15] = 'L';
    }


    private static class StreamServer extends Thread
    {
        private ZContext context;

        public String host;

        private StreamServer(ZContext context)
        {
            this.context = context;
        }

        @Override
        public void run()
        {

            ZMQ.Socket streamer = context.createSocket(SocketType.STREAM);
            zmq.ZMQ.setSocketOption(streamer.base(), zmq.ZMQ.ZMQ_LINGER, 0);
            streamer.bind("tcp://*:12307");
            host = (String) zmq.ZMQ.getSocketOptionExt(streamer.base(), zmq.ZMQ.ZMQ_LAST_ENDPOINT);
            System.out.println("server address is: "+host);

            int count = 0;

            Set<String> peerSet = new HashSet<>();

            while (true) {
                Msg idmsg = zmq.ZMQ.recv(streamer.base(), 0);

                count++;
                System.out.println("server received "+count+" msgs");

                String peerAddr = idmsg.getMetadata().get("Peer-Address");

                Msg msg = zmq.ZMQ.recv(streamer.base(), 0);

                System.out.println(String.format("received msg from client address: %s size: %d", peerAddr, msg.size()));
                System.out.println(new String(msg.data(),ZMQ.CHARSET));

                while (msg.hasMore()) {
                    msg = zmq.ZMQ.recv(streamer.base(), 0);
                    System.out.println(String.format("received msg from client address: %s size: %d", peerAddr, msg.size()));
                    System.out.println(new String(msg.data(),ZMQ.CHARSET));
                }

                if (!peerSet.contains(peerAddr)) {
                    // 出现新地址 保存到 peerSet 并发送greeting
                    peerSet.add(peerAddr);

                    //向客户端发送greeting消息否则无法开始接收消息
//                    zmq.ZMQ.send(streamer.base(), idmsg, ZMQ.SNDMORE);
                    streamer.send(idmsg.data(),ZMQ.SNDMORE);
                    streamer.send(standardGreeting,0);
//                    streamer.send(standardGreeting,0, standardGreeting.length, 0);
//                    zmq.ZMQ.send(streamer.base(), standardGreeting, standardGreeting.length, 0);
                }

            }
        }
    }
    private static class StreamClient extends Thread
    {
        private ZContext context;

        private StreamClient(ZContext context)
        {
            this.context = context;
        }

        @Override
        public void run()
        {
            ZMQ.Socket streamer = context.createSocket(SocketType.STREAM);
            streamer.connect("tcp://localhost:12307");
            int count = 0;

            Set<String> peerSet = new HashSet<>();

            //----------------------完成连接------------------

            // 先处理第一帧 identity
            Msg idmsg = zmq.ZMQ.recv(streamer.base(), 0);
            String peerAddr = idmsg.getMetadata().get("Peer-Address");
            count++;
            System.out.println("client received " + count + " msgs");

            // 开始处理第二帧 具体消息数据
            Msg msg = zmq.ZMQ.recv(streamer.base(), 0);
//                Msg msg = zmq.ZMQ.recv(streamer.base(), zmq.ZMQ.ZMQ_DONTWAIT);
            System.out.println(String.format("received msg from server address: %s size: %d", peerAddr, msg.size()));
            while (msg.hasMore()) {
                msg = zmq.ZMQ.recv(streamer.base(), 0);
                System.out.println(String.format("received msg from server address: %s size: %d", peerAddr, msg.size()));
            }

            // -------------------------greeting------------------------

            // 先处理第一帧 identity
            idmsg = zmq.ZMQ.recv(streamer.base(), 0);
            peerAddr = idmsg.getMetadata().get("Peer-Address");
            count++;
            System.out.println("client received " + count + " msgs");

            // 开始处理第二帧 具体消息数据
            msg = zmq.ZMQ.recv(streamer.base(), 0);
            System.out.println(String.format("received msg from server address: %s size: %d", peerAddr, msg.size()));
            while (msg.hasMore()) {
                msg = zmq.ZMQ.recv(streamer.base(), 0);
                System.out.println(String.format("received msg from server address: %s size: %d", peerAddr, msg.size()));
            }

            //------------------------------------------开始循环发送消息------------------------------
            count = 0;
            while(true){
                count++;
                zmq.ZMQ.send(streamer.base(), idmsg, zmq.ZMQ.ZMQ_SNDMORE);
                zmq.ZMQ.send(streamer.base(), count+" hello", 0);
                ZMQ.sleep(3);
            }

        }
    }

    public static void main(String[] argv)  throws InterruptedException
    {
        try (ZContext ctx = new ZContext()) {
            Thread serverThread =  new StreamServer(ctx);
            Thread clientThread =  new StreamClient(ctx);

            serverThread.start();

            ZMQ.sleep(1);

            clientThread.start();


            serverThread.join();
            clientThread.join();
        }

    }
}


